import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit
import org.junit.Test

class RDDTest {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf);

    //读取文件
    val rdd1 = sc.textFile("datas")
    //文件拆分
    val rdd2 = rdd1.flatMap(item => item.split(" "))
    //赋词频
    val rdd3 = rdd2.map(item => (item, 1))
    //以key为区分，词频相加
    val rdd4 = rdd3.reduceByKey((curr, agg) => curr + agg)
    //收集数据并且输出
    rdd4.collect().foreach(println)
    sc.stop()
  }

  @junit.Test
  def sparkContext(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
  }

  //本地集合创建RDD
  @junit.Test
  def rddCreationLocal(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val seq1 = Seq("hello", "world", "HI")
    val seq2 = Seq(1, 2, 3)
    //可以不指定分区数
    val rdd1: RDD[String] = sc.parallelize(seq1, 2)
    //要指定分区数
    val rdd2: RDD[Int] = sc.makeRDD(Seq(1, 2, 3, 4), 2)
  }

  @junit.Test
  //外部数据(文件)创建RDD
  def rddCreationFiles(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("file_path")
    //1、textFile传入的是一个路径
    //2、分区是由HDFS中的block决定的
  }

  @junit.Test
  //RDD衍生RDD
  def rddCreateFromRDD(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(1, 2, 3))
    //通过RDD执行算子操作会产生RDD
    val rdd2 = rdd1.map(item => (item, 1))
  }

  @junit.Test
  def mapTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    //1、创建RDD
    val rdd1 = sc.parallelize(Seq(1, 2, 3))
    //2、执行map操作
    val rdd2: RDD[Int] = rdd1.map(item => (item * 10))
    //3、得到结果
    rdd2.collect().foreach(println)
  }

  @junit.Test
  def flatMapTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    //创建RDD
    val rdd1 = sc.parallelize(Seq("1,2,3,4"))
    //执行flatMap操作
    val rdd2 = rdd1.flatMap(item => (item.split(",")))
    rdd2.collect().foreach(println)
  }


  /**
    * 过滤算子
    * 保留整数
    */
  @Test
  def filterTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
      .filter(item => item % 2 == 0)
      .collect()
      .foreach(item => println(item))
  }

  /**
    * map是针对于每一条数据进行处理
    * mapPartitions针对的是每一条分区
    */
  @Test
  def mapPartitionsTest01(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    //1、数据生成
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6))
      .mapPartitions(iter => {
        iter.foreach(item => println(item))
        iter
      })
      .collect()
  }

  @Test
  def mapPartitionsTest02(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    //1、数据生成
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitions(iter => {
        iter.map(item => item * 10)
      })
      .collect()
      .foreach(item => println(item))
  }

  @Test
  def mapPartitionsWithIndexTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitionsWithIndex((index, iter) => {
        println("index:" + index)
        iter.foreach(item => println(item))
        iter
      })
      .collect()
      .foreach(item => println(item))
  }

  /**
    * 将大数据集变成小数据集
    * 同时尽量减少对于数据集特征的损失
    */
  @Test
  def sampleTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    //false是不放回，0.6是大小比
    val rdd2 = rdd1.sample(false, 0.6)
    val result = rdd2.collect()
    result.foreach(item => println(item))
  }

  @Test
  def mapValueTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5), ("f", 6)))
      .mapValues(item => item * 10)
      .collect()
      .foreach(item => println(item))
  }

  @Test
  def unionTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(1, 2, 3))
    val rdd2 = sc.parallelize(Seq(3, 4, 5))
    rdd1.union(rdd2)
      .collect()
      .foreach(println)
  }

  @Test
  def intersectionTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4))
    val rdd2 = sc.parallelize(Seq(3, 4, 5, 6))
    rdd1.intersection(rdd2)
      .collect()
      .foreach(println)
  }

  @Test
  def subtractTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4))
    val rdd2 = sc.parallelize(Seq(3, 4, 5, 6))
    rdd1.subtract(rdd2)
      .collect()
      .foreach(println)
  }
  @junit.Test
  def reduceByKeyTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    //创建RDD
    val rdd1 = sc.parallelize(Seq("hello world", "spark world", "spark hello"))
    //处理数据
    val rdd2 = rdd1.flatMap(item => (item.split(" ")))
      .map(item => (item, 1))
      .reduceByKey((curr, agg) => curr + agg)
    //得到结果
    val result = rdd2.collect()
    result.foreach(item => println(item))
    //关闭
    sc.stop()
  }
  @Test
  def GroupByKey(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("c", 4), ("a", 5), ("f", 6)))
      .groupByKey()
      .collect()
      .foreach(item => println(item))
  }

  /**
    * groupByKey和ReduceByKey的底层
    */
  @Test
  def combinerByKeyTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1: RDD[(String, Double)] = sc.parallelize(Seq(("a", 89.0), ("a", 90), ("b", 89), ("a", 86), ("b", 99)))
    val result = rdd1.combineByKey(
      //先分区，再在分区里分组
      //转换数据的函数，初始函数，只作用于分组后的第一条数据
      createCombiner = (curr: Double) => (curr, 1),
      //在分区上聚合
      mergeValue = (curr: (Double, Int), next: Double) => (curr._1 + next, curr._2 + 1),
      //把所有分区的结果聚合为最终结果
      mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
    )
    //(a,(90+90+90,3))
    val r = result.map(item => (item._1, item._2._1 / item._2._2))
    r.collect().foreach(println)
  }

  @Test
  def combiner(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1: RDD[(String, Double)] = sc.parallelize(Seq(("a", 89.0), ("a", 90), ("b", 87), ("a", 86), ("b", 99)))
    val result = rdd1.combineByKey(
      createCombiner = (curr: Double) => (curr, 1),
      mergeValue = (curr: (Double, Int), next: (Double)) => (curr._1 + next, curr._2 + 1),
      mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
    )
    result.map(item => (item._1, item._2._1 / item._2._2))
      .collect()
      .foreach(println)
  }

  @Test
  def foldByKeyTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(("a", 89), ("a", 90), ("b", 87)))
    rdd1.foldByKey(10)((curr, agg) => curr + agg)
      .collect()
      .foreach(println(_))
  }

  /**
    * 是foldByKey的底层
    * aggregateByKey(zeroValue)(seqOp,combOp)
    * zeroValue:指定初始值
    * seqOp：作用于每一个元素，根据初始值，进行计算
    * combOp：将seqOp处理过的结果进行聚合
    * 适用于先处理，在聚合的操作
    */
  @Test
  def aggregateByKey(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
    rdd1.aggregateByKey(0.8)((zeroValue, item) => item * zeroValue, (curr, agg) => curr + agg)
      .collect()
      .foreach(println(_))
  }

  /**
    * join是按照key来配对的
    * a.join(b)
    * a来匹配b中的key,然后笛卡尔积
    */
  @Test
  def joinTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
    val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))
    rdd1.join(rdd2)
      .collect()
      .foreach(println(_))
  }

  /**
    * sortBy可以按照任何部分进行排序
    * sortByKey只可以按照key来排序
    */
  @Test
  def sortByTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd2 = sc.parallelize(Seq(("a", 89), ("c", 90), ("b", 87)))
    val rdd1 = sc.parallelize(Seq(2, 4, 1, 5, 7, 3))
    //rdd1升序排序
    rdd1.sortBy(item => item, ascending = true).collect().foreach(println)
    //rdd2按照第二个来降序排序
    rdd2.sortBy(item => item._2, ascending = false).collect().foreach(println)
    //rdd2按照key来排序
    rdd2.sortByKey().collect().foreach(println)
  }

  /**
    * repartition:重新设置分区
    *
    * coalesce：减少分区，只能设置比先前少的的分区数
    */
  @Test
  def PartitionsTest(): Unit = {
    //1、创建SparkConf
    val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
    //2、创建SparkContext
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(2, 4, 1, 5, 7, 3),2)
    //设置分区数,分区数可大可小
    rdd1.repartition(4).partitions.size
    val rdd2 = sc.parallelize(Seq(("a", 89), ("c", 90), ("b", 87)),3)
    //以为coalesce默认时不参与shuffle的，所以在设置分区数的时候，只能减少
    //但是如果将coalesce的参数设置为true那么就可以增加分区数
    rdd2.coalesce(1).partitions.size
    rdd2.coalesce(4,true).partitions.size
  }
}